package com.tomato.study.redis.stream.consumer.listener;

import com.tomato.study.redis.stream.core.RedisConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * 消费完成后确认已消费
 *
 * @author lizhifu
 * @date 2022/5/11
 */
@Component
@Slf4j
public class RedisStreamAckListener implements StreamListener<String, MapRecord<String, String, String>> {
    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        log.info("RedisStreamAckListener 消费者消息监听器：{}", message);
        // 接收到消息
        log.info("RedisStreamAckListener message id " + message.getId());
        log.info("RedisStreamAckListener stream " + message.getStream());
        log.info("RedisStreamAckListener body " + message.getValue());

        // 消费完成后确认消费（ACK）
        StreamOperations<String, String, String> streamOperations = stringRedisTemplate.opsForStream();
        streamOperations.acknowledge(RedisConstants.REDIS_STREAM1,RedisConstants.REDIS_STREAM1_GROUP2,message.getId());
        // 我们可以启动定时任务不断监听 pending 列表，处理死信消息
        // PendingMessagesSummary pending = stringRedisTemplate.opsForStream().pending(RedisConstants.REDIS_STREAM1, RedisConstants.REDIS_STREAM1_GROUP2);
    }
}
